"""Adapted from https://github.com/huggingface/transformers/tree/main/tests//models/qwen3/test_modeling_qwen3.py."""

# tests/models/llama/test_modeling_llama.py
import inspect

import numpy as np
import pytest
import torch
import transformers

import mindspore as ms

from tests.modeling_test_utils import (
    MS_DTYPE_MAPPING,
    PT_DTYPE_MAPPING,
    compute_diffs,
    generalized_parse_args,
    get_modules,
)
from tests.transformers_tests.models.modeling_common import ids_numpy

DTYPE_AND_THRESHOLDS = {"fp32": 5e-4, "fp16": 5e-3, "bf16": 5e-2}
MODES = [0, 1]

if transformers.__version__ >= "4.51.0":
    from transformers import Qwen3Config

    class Qwen3ModelTester:
        config_class = Qwen3Config

        def __init__(
            self,
            batch_size=13,
            seq_length=7,
            is_training=True,
            use_input_mask=True,
            vocab_size=99,
            hidden_size=32,
            num_hidden_layers=2,
            num_attention_heads=4,
            num_key_value_heads=2,
            intermediate_size=37,
            hidden_act="silu",
            max_position_embeddings=512,
            initializer_range=0.02,
            pad_token_id=0,
            rms_norm_eps=1e-6,
        ):
            self.batch_size = batch_size
            self.seq_length = seq_length
            self.is_training = is_training
            self.use_input_mask = use_input_mask
            self.vocab_size = vocab_size
            self.hidden_size = hidden_size
            self.num_hidden_layers = num_hidden_layers
            self.num_attention_heads = num_attention_heads
            self.num_key_value_heads = num_key_value_heads
            self.intermediate_size = intermediate_size
            self.hidden_act = hidden_act
            self.max_position_embeddings = max_position_embeddings
            self.initializer_range = initializer_range
            self.pad_token_id = pad_token_id
            self.rms_norm_eps = rms_norm_eps

            self.head_dim = self.hidden_size // self.num_attention_heads

        def prepare_config_and_inputs(self):
            input_ids = ids_numpy([self.batch_size, self.seq_length], self.vocab_size)

            input_mask = None
            if self.use_input_mask:
                input_mask = np.tril(np.ones_like(input_ids))

            config = self.get_config()

            # set _attn_implementation to eager because flash-attention is not supported for torch in cpu
            config._attn_implementation = "eager"

            return config, input_ids, input_mask

        def get_config(self):
            return self.config_class(
                vocab_size=self.vocab_size,
                hidden_size=self.hidden_size,
                num_hidden_layers=self.num_hidden_layers,
                num_attention_heads=self.num_attention_heads,
                num_key_value_heads=self.num_key_value_heads,
                intermediate_size=self.intermediate_size,
                hidden_act=self.hidden_act,
                max_position_embeddings=self.max_position_embeddings,
                initializer_range=self.initializer_range,
                pad_token_id=self.pad_token_id,
                rms_norm_eps=self.rms_norm_eps,
                use_cache=False,
                sliding_window=None,
            )

    model_tester = Qwen3ModelTester()
    config, input_ids, input_mask = model_tester.prepare_config_and_inputs()

    LLAMA_CASES = [
        [
            "Qwen3Model",
            "transformers.Qwen3ForCausalLM",
            "mindone.transformers.Qwen3ForCausalLM",
            (config,),
            {},
            (input_ids,),
            {
                "attention_mask": input_mask,
            },
            {
                "logits": 0,  # key: torch attribute, value: mindspore idx
            },
        ],
    ]

    @pytest.mark.parametrize(
        "name,pt_module,ms_module,init_args,init_kwargs,inputs_args,inputs_kwargs,outputs_map,dtype,mode",
        [
            case
            + [
                dtype,
            ]
            + [
                mode,
            ]
            for case in LLAMA_CASES
            for dtype in DTYPE_AND_THRESHOLDS.keys()
            for mode in MODES
        ],
    )
    def test_named_modules(
        name,
        pt_module,
        ms_module,
        init_args,
        init_kwargs,
        inputs_args,
        inputs_kwargs,
        outputs_map,
        dtype,
        mode,
    ):
        ms.set_context(mode=mode)

        (
            pt_model,
            ms_model,
            pt_dtype,
            ms_dtype,
        ) = get_modules(pt_module, ms_module, dtype, *init_args, **init_kwargs)
        pt_inputs_args, pt_inputs_kwargs, ms_inputs_args, ms_inputs_kwargs = generalized_parse_args(
            pt_dtype, ms_dtype, *inputs_args, **inputs_kwargs
        )

        if "hidden_dtype" in inspect.signature(pt_model.forward).parameters:
            pt_inputs_kwargs.update({"hidden_dtype": PT_DTYPE_MAPPING[pt_dtype]})
            ms_inputs_kwargs.update({"hidden_dtype": MS_DTYPE_MAPPING[ms_dtype]})

        with torch.no_grad():
            pt_outputs = pt_model(*pt_inputs_args, **pt_inputs_kwargs)
        ms_outputs = ms_model(*ms_inputs_args, **ms_inputs_kwargs)

        if outputs_map:
            pt_outputs_n = []
            ms_outputs_n = []
            for pt_key, ms_idx in outputs_map.items():
                pt_output = getattr(pt_outputs, pt_key)
                ms_output = ms_outputs[ms_idx]
                if isinstance(pt_output, (list, tuple)):
                    pt_outputs_n += list(pt_output)
                    ms_outputs_n += list(ms_output)
                else:
                    pt_outputs_n.append(pt_output)
                    ms_outputs_n.append(ms_output)
            diffs = compute_diffs(pt_outputs_n, ms_outputs_n)
        else:
            diffs = compute_diffs(pt_outputs, ms_outputs)

        THRESHOLD = DTYPE_AND_THRESHOLDS[ms_dtype]
        assert (np.array(diffs) < THRESHOLD).all(), (
            f"ms_dtype: {ms_dtype}, pt_type:{pt_dtype}, "
            f"Outputs({np.array(diffs).tolist()}) has diff bigger than {THRESHOLD}"
        )
